Skip to content

Conversation

@ad-astra-video
Copy link
Collaborator

@ad-astra-video ad-astra-video commented Sep 2, 2025

What does this pull request do? Explain your changes. (required)

Adds configurable streaming for BYOC entrypoint to go-livepeer. Uses trickle protocol to handle streaming for similar entrypoints and outputs from go-livepeer as live-video-to-video.

Streams can be any or a mix of the following:

  • video ingress via WHIP (with Gateway) or RTMP (with MediaMTX)
  • video egress via RTMP (with MediaMTX) or WHEP (with mediamtx)
  • SSE data output

Control and Events channels are created for every stream.

Streams are created with a POST request to /ai/stream/start that will start the stream and reserve the capacity with an Orchestrator that is providing the BYOC capability. If video ingress is enabled, the client should then start a stream with WHIP or RTMP to the provided ingress URLs provided in the response. URLs for egress video, data, updates (control) and events are also included in the response as well as the stream_id. The stream_id is an integral part of the URLs provided to interact with the stream and is combined with a provided stream name in the /ai/stream/start request.

Streams are stopped with a POST request to /ai/stream/stop. Orchestrators and Gateways track payment balance and the Gateway adjusts to the Orchestrators provided balance in new JobTokens provided at each payment interval every minute. Orchestrators will shutdown a stream when payment balance is zero.

Specific updates (required)

  • Add job_stream.go and job_stream_test.go
  • refactor job_rpc.go to reuse stream setup where made sense
  • updates go routines to ignore to enable tests to pass in common/testutil.go.

How did you test each of these updates (required)

Used byoc-stream to test end to end: https://github.com/ad-astra-video/livepeer-app-pipelines/tree/main/byoc-stream
Added tests to job_stream_test.go and some additional tests to job_rpc_test.go.

Does this pull request close any open issues?

Checklist:

@github-actions github-actions bot added go Pull requests that update Go code AI Issues and PR related to the AI-video branch. labels Sep 2, 2025
@codecov
Copy link

codecov bot commented Sep 2, 2025

Codecov Report

❌ Patch coverage is 48.92183% with 758 lines in your changes missing coverage. Please review.
✅ Project coverage is 32.93201%. Comparing base (f5e759d) to head (bf2fec3).

Files with missing lines Patch % Lines
server/job_stream.go 55.83864% 341 Missing and 75 partials ⚠️
server/job_rpc.go 50.95057% 101 Missing and 28 partials ⚠️
core/external_capabilities.go 4.81928% 79 Missing ⚠️
server/ai_live_video.go 30.55556% 69 Missing and 6 partials ⚠️
core/livepeernode.go 0.00000% 37 Missing ⚠️
core/ai_orchestrator.go 0.00000% 7 Missing ⚠️
common/testutil.go 0.00000% 6 Missing ⚠️
server/rpc.go 0.00000% 4 Missing ⚠️
server/ai_mediaserver.go 72.72727% 2 Missing and 1 partial ⚠️
core/accounting.go 91.30435% 1 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@                 Coverage Diff                 @@
##              master       #3727         +/-   ##
===================================================
+ Coverage   31.54628%   32.93201%   +1.38573%     
===================================================
  Files            159         160          +1     
  Lines          38984       40283       +1299     
===================================================
+ Hits           12298       13266        +968     
- Misses         25797       25979        +182     
- Partials         889        1038        +149     
Files with missing lines Coverage Δ
server/ai_process.go 2.01913% <ø> (ø)
core/accounting.go 94.73684% <91.30435%> (-0.71771%) ⬇️
server/ai_mediaserver.go 9.53912% <72.72727%> (+2.93024%) ⬆️
server/rpc.go 68.86228% <0.00000%> (+0.07440%) ⬆️
common/testutil.go 15.87302% <0.00000%> (-1.67084%) ⬇️
core/ai_orchestrator.go 30.40629% <0.00000%> (-0.25160%) ⬇️
core/livepeernode.go 50.53763% <0.00000%> (-12.54962%) ⬇️
server/ai_live_video.go 32.93769% <30.55556%> (+32.93769%) ⬆️
core/external_capabilities.go 25.89286% <4.81928%> (-61.60714%) ⬇️
server/job_rpc.go 44.88491% <50.95057%> (+14.20644%) ⬆️
... and 1 more

... and 3 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f5e759d...bf2fec3. Read the comment docs.

Files with missing lines Coverage Δ
server/ai_process.go 2.01913% <ø> (ø)
core/accounting.go 94.73684% <91.30435%> (-0.71771%) ⬇️
server/ai_mediaserver.go 9.53912% <72.72727%> (+2.93024%) ⬆️
server/rpc.go 68.86228% <0.00000%> (+0.07440%) ⬆️
common/testutil.go 15.87302% <0.00000%> (-1.67084%) ⬇️
core/ai_orchestrator.go 30.40629% <0.00000%> (-0.25160%) ⬇️
core/livepeernode.go 50.53763% <0.00000%> (-12.54962%) ⬇️
server/ai_live_video.go 32.93769% <30.55556%> (+32.93769%) ⬆️
core/external_capabilities.go 25.89286% <4.81928%> (-61.60714%) ⬇️
server/job_rpc.go 44.88491% <50.95057%> (+14.20644%) ⬆️
... and 1 more

... and 3 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

}
stopJob.sign() //no changes to make, sign job

token, err := sessionToToken(params.liveParams.sess)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets check/log this err here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still hoping we can check and log this err

Copy link
Collaborator Author

@ad-astra-video ad-astra-video Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an error check and log in d73fe37. Note that this function cannot return an error currently, should I just update to only return the JobToken. I think I added an error here thinking as I built it out something could cause an error possibly.

@pschroedl pschroedl self-requested a review October 27, 2025 15:24
balance = node.Balances.Balance(orchAddr, core.ManifestID(jobReq.Capability))
}

diff := new(big.Rat).Sub(orchBal, balance)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had some observations and questions here ...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe out of bounds for this PR, but still maybe some legit concern here:

  • What could happen if Orchestrators can influence Gateway accounting?
  • Should there be bounds on balance adjustments?
  • How do we handle legitimate consumption vs. potential manipulation?

I think this is a good chance to think through payment security patterns here, such as:

  • What happens if orchToken.Balance is higher than balance?
    ( how does transcoding payment flows handle this? )
    And we should probably consider adding validation logic, i.e. bounds

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe out of bounds for this PR, but still maybe some legit concern here:
What could happen if Orchestrators can influence Gateway accounting?
Should there be bounds on balance adjustments?
How do we handle legitimate consumption vs. potential manipulation?

This is a possible threat to Gateways for sure but is also a threat on every other pipeline in Livepeer because the Orchestrator always wins. If an Orchestrator continusously returns no balance then Gateways have to create tickets to pay for it or the Orchestrator theoretically would stop work.

Risk to Orchestrators is the Gateway never uses them again.

What happens if orchToken.Balance is higher than balance?
( how does transcoding payment flows handle this? )

There is no balance reporting back to the Gateway outside of BYOC. For transcoding, all balance is lost when the stream starts since it is tracked on manifest ID and balances are cleared at the end of a stream. Similar path is used for batch AI requests. BYOC takes a different approach using the capability name as the balance key to track the balance so balance can be shared across requests.

I was using this as a first step to start that bi directional communication on balance with the plan to tighten it up after some more real world testing.

Copy link
Collaborator Author

@ad-astra-video ad-astra-video Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did some updates and moved the compare/adjust/report in createPayment to a new function of AddressBalances that uses a new mutex to lock. The mutex in address balances is used for each individual operation (Balance/Debit/Credit). New sharedBalMtx mutex wraps all those operations to avoid changes while updating.

Also added a lock around payment application for the Orchestrator monitoring go routine.

See db1df66, spelling fixed in 41bbe78


//start payment monitoring
go func() {
stream, _ := h.node.ExternalCapabilities.Streams[orchJob.Req.ID]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this accesses ExternalCapabilities.Streams map directly without holding the capm mutex. any concurrent map access will throw panics when streams are added/removed unless we acquire the lock first. I think we should follow a pattern similar to how StreamExists() is used - adding a helper GetStream(streamID) could be useful for this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed that we're re-using this - we should probably re-capture each time instead of re-using stream

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in e9a4c9d

}

clog.Infof(ctx, "Insufficient balance, stopping stream %s for sender %s", orchJob.Req.ID, orchJob.Sender)
_, exists := h.node.ExternalCapabilities.Streams[orchJob.Req.ID]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see previous

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in e9a4c9d


//check if stream still exists
// if not, send stop to worker and exit monitoring
stream, exists := h.node.ExternalCapabilities.Streams[orchJob.Req.ID]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too ;)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in e9a4c9d

if !exists {
clog.Errorf(ctx, "Stream %s not found", streamId)
return
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly add

    defer ls.LivepeerNode.RemoveLivePipeline(streamId)

to ensure cleanup even if something goes wrong

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved RemoveLivePipeline call up to defer statement before the loop. RemoveLivePipeline just deletes from the map and is a noop if already deleted. See f8beda8

priceInfo := sess.OrchestratorInfo.PriceInfo
var paymentProcessor *LivePaymentProcessor
if priceInfo != nil && priceInfo.PricePerUnit != 0 {
if priceInfo != nil && priceInfo.PricePerUnit != 0 && sess.OrchestratorInfo.AuthToken != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comment documenting why we're using sess.OrchestratorInfo.AuthToken check here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 17357af


//ensure streamRequest is not nil or empty
if streamRequest == nil || len(streamRequest) == 0 {
streamRequest = []byte("{}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets make a note here that we either require an explicit empty json object from the caller ( or somehow else valid as a default - you said in chat it was upstream? )

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see 421ba70

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI Issues and PR related to the AI-video branch. go Pull requests that update Go code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants